/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.blockmanagement;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.namenode.NameNode;

/**
 * Represents a block that is currently being constructed.<br>
 * This is usually the last block of a file opened for write or append.
 * <p>
 * HDFS在加载fsimage时，如果当前加载的文件处于正在构建状态，也就是INodeFile中包含FileUnderConstructionFeature特性，
 * 则将该INodeFile的最后一个数据块设置为BlockInfoUnderConstruction，表明最后一个数据块正在构建中。
 * 这里要注意处于构建状态的INodeFile除最后一个数据块为BlockInfoUnderConstruction外，其他均为正常的BlockInfo。
 */
public class BlockInfoUnderConstruction extends BlockInfo {
    /**
     * Block state. See {@link BlockUCState}
     */
    private BlockUCState blockUCState;

    /**
     * Block replicas as assigned when the block was allocated.
     * This defines the pipeline order.
     */
    private List<ReplicaUnderConstruction> replicas;

    /**
     * Index of the primary data node doing the recovery. Useful for log
     * messages.
     */
    private int primaryNodeIndex = -1;

    /**
     * The new generation stamp, which this block will have
     * after the recovery succeeds. Also used as a recovery id to identify
     * the right recovery if any of the abandoned recoveries re-appear.
     */
    private long blockRecoveryId = 0;

    /**
     * ReplicaUnderConstruction contains information about replicas while
     * they are under construction.
     * The GS, the length and the state of the replica is as reported by
     * the data-node.
     * It is not guaranteed, but expected, that data-nodes actually have
     * corresponding replicas.
     */
    static class ReplicaUnderConstruction extends Block {
        private final DatanodeStorageInfo expectedLocation;
        private ReplicaState state;
        private boolean chosenAsPrimary;

        ReplicaUnderConstruction(Block block,
                                 DatanodeStorageInfo target,
                                 ReplicaState state) {
            super(block);
            this.expectedLocation = target;
            this.state = state;
            this.chosenAsPrimary = false;
        }

        /**
         * Expected block replica location as assigned when the block was allocated.
         * This defines the pipeline order.
         * It is not guaranteed, but expected, that the data-node actually has
         * the replica.
         */
        private DatanodeStorageInfo getExpectedStorageLocation() {
            return expectedLocation;
        }

        /**
         * Get replica state as reported by the data-node.
         */
        ReplicaState getState() {
            return state;
        }

        /**
         * Whether the replica was chosen for recovery.
         */
        boolean getChosenAsPrimary() {
            return chosenAsPrimary;
        }

        /**
         * Set replica state.
         */
        void setState(ReplicaState s) {
            state = s;
        }

        /**
         * Set whether this replica was chosen for recovery.
         */
        void setChosenAsPrimary(boolean chosenAsPrimary) {
            this.chosenAsPrimary = chosenAsPrimary;
        }

        /**
         * Is data-node the replica belongs to alive.
         */
        boolean isAlive() {
            return expectedLocation.getDatanodeDescriptor().isAlive;
        }

        @Override // Block
        public int hashCode() {
            return super.hashCode();
        }

        @Override // Block
        public boolean equals(Object obj) {
            // Sufficient to rely on super's implementation
            return (this == obj) || super.equals(obj);
        }

        @Override
        public String toString() {
            final StringBuilder b = new StringBuilder(50);
            appendStringTo(b);
            return b.toString();
        }

        @Override
        public void appendStringTo(StringBuilder sb) {
            sb.append("ReplicaUnderConstruction[")
                    .append(expectedLocation)
                    .append("|")
                    .append(state)
                    .append("]");
        }
    }

    /**
     * Create block and set its state to
     * {@link BlockUCState#UNDER_CONSTRUCTION}.
     */
    public BlockInfoUnderConstruction(Block blk, int replication) {
        this(blk, replication, BlockUCState.UNDER_CONSTRUCTION, null);
    }

    /**
     * Create a block that is currently being constructed.
     */
    public BlockInfoUnderConstruction(Block blk, int replication,
                                      BlockUCState state,
                                      DatanodeStorageInfo[] targets) {
        super(blk, replication);
        assert getBlockUCState() != BlockUCState.COMPLETE :
                "BlockInfoUnderConstruction cannot be in COMPLETE state";
        this.blockUCState = state;
        setExpectedLocations(targets);
    }

    /**
     * Convert an under construction block to a complete block.
     *
     * @return BlockInfo - a complete block.
     * @throws IOException if the state of the block
     *                     (the generation stamp and the length) has not been committed by
     *                     the client or it does not have at least a minimal number of replicas
     *                     reported from data-nodes.
     */
    BlockInfo convertToCompleteBlock() throws IOException {
        assert getBlockUCState() != BlockUCState.COMPLETE :
                "Trying to convert a COMPLETE block";
        return new BlockInfo(this);
    }

    /**
     * Set expected locations
     */
    public void setExpectedLocations(DatanodeStorageInfo[] targets) {
        int numLocations = targets == null ? 0 : targets.length;
        this.replicas = new ArrayList<ReplicaUnderConstruction>(numLocations);
        for (int i = 0; i < numLocations; i++)
            replicas.add(
                    new ReplicaUnderConstruction(this, targets[i], ReplicaState.RBW));
    }

    /**
     * Create array of expected replica locations
     * (as has been assigned by chooseTargets()).
     */
    public DatanodeStorageInfo[] getExpectedStorageLocations() {
        int numLocations = replicas == null ? 0 : replicas.size();
        DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations];
        for (int i = 0; i < numLocations; i++)
            storages[i] = replicas.get(i).getExpectedStorageLocation();
        return storages;
    }

    /**
     * Get the number of expected locations
     */
    public int getNumExpectedLocations() {
        return replicas == null ? 0 : replicas.size();
    }

    /**
     * Return the state of the block under construction.
     *
     * @see BlockUCState
     */
    @Override // BlockInfo
    public BlockUCState getBlockUCState() {
        return blockUCState;
    }

    void setBlockUCState(BlockUCState s) {
        blockUCState = s;
    }

    /**
     * Get block recovery ID
     */
    public long getBlockRecoveryId() {
        return blockRecoveryId;
    }

    /**
     * Process the recorded replicas. When about to commit or finish the
     * pipeline recovery sort out bad replicas.
     *
     * @param genStamp The final generation stamp for the block.
     */
    public void setGenerationStampAndVerifyReplicas(long genStamp) {
        // Set the generation stamp for the block.
        setGenerationStamp(genStamp);
        if (replicas == null)
            return;

        // Remove the replicas with wrong gen stamp.
        // The replica list is unchanged.
        for (ReplicaUnderConstruction r : replicas) {
            if (genStamp != r.getGenerationStamp()) {
                r.getExpectedStorageLocation().removeBlock(this);
                NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
                        + "from location: " + r.getExpectedStorageLocation());
            }
        }
    }

    /**
     * Commit block's length and generation stamp as reported by the client.
     * Set block state to {@link BlockUCState#COMMITTED}.
     *
     * @param block - contains client reported block length and generation
     * @throws IOException if block ids are inconsistent.
     */
    void commitBlock(Block block) throws IOException {
        if (getBlockId() != block.getBlockId())
            throw new IOException("Trying to commit inconsistent block: id = "
                    + block.getBlockId() + ", expected id = " + getBlockId());
        blockUCState = BlockUCState.COMMITTED;
        this.setNumBytes(block.getNumBytes());
        // Sort out invalid replicas.
        setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
    }

    /**
     * Initialize lease recovery for this block.
     * Find the first alive data-node starting from the previous primary and
     * make it primary.
     */
    public void initializeBlockRecovery(long recoveryId) {
        setBlockUCState(BlockUCState.UNDER_RECOVERY);
        blockRecoveryId = recoveryId;
        if (replicas.size() == 0) {
            NameNode.blockStateChangeLog.warn("BLOCK*"
                    + " BlockInfoUnderConstruction.initLeaseRecovery:"
                    + " No blocks found, lease removed.");
        }
        boolean allLiveReplicasTriedAsPrimary = true;
        for (int i = 0; i < replicas.size(); i++) {
            // Check if all replicas have been tried or not.
            if (replicas.get(i).isAlive()) {
                allLiveReplicasTriedAsPrimary =
                        (allLiveReplicasTriedAsPrimary && replicas.get(i).getChosenAsPrimary());
            }
        }
        if (allLiveReplicasTriedAsPrimary) {
            // Just set all the replicas to be chosen whether they are alive or not.
            for (int i = 0; i < replicas.size(); i++) {
                replicas.get(i).setChosenAsPrimary(false);
            }
        }
        long mostRecentLastUpdate = 0;
        ReplicaUnderConstruction primary = null;
        primaryNodeIndex = -1;
        for (int i = 0; i < replicas.size(); i++) {
            // Skip alive replicas which have been chosen for recovery.
            if (!(replicas.get(i).isAlive() && !replicas.get(i).getChosenAsPrimary())) {
                continue;
            }
            final ReplicaUnderConstruction ruc = replicas.get(i);
            final long lastUpdate = ruc.getExpectedStorageLocation().getDatanodeDescriptor().getLastUpdate();
            if (lastUpdate > mostRecentLastUpdate) {
                primaryNodeIndex = i;
                primary = ruc;
                mostRecentLastUpdate = lastUpdate;
            }
        }
        if (primary != null) {
            primary.getExpectedStorageLocation().getDatanodeDescriptor().addBlockToBeRecovered(this);
            primary.setChosenAsPrimary(true);
            NameNode.blockStateChangeLog.info("BLOCK* " + this
                    + " recovery started, primary=" + primary);
        }
    }

    void addReplicaIfNotPresent(DatanodeStorageInfo storage,
                                Block block,
                                ReplicaState rState) {
        Iterator<ReplicaUnderConstruction> it = replicas.iterator();
        while (it.hasNext()) {
            ReplicaUnderConstruction r = it.next();
            DatanodeStorageInfo expectedLocation = r.getExpectedStorageLocation();
            if (expectedLocation == storage) {
                // Record the gen stamp from the report
                r.setGenerationStamp(block.getGenerationStamp());
                return;
            } else if (expectedLocation != null &&
                    expectedLocation.getDatanodeDescriptor() ==
                            storage.getDatanodeDescriptor()) {

                // The Datanode reported that the block is on a different storage
                // than the one chosen by BlockPlacementPolicy. This can occur as
                // we allow Datanodes to choose the target storage. Update our
                // state by removing the stale entry and adding a new one.
                it.remove();
                break;
            }
        }
        replicas.add(new ReplicaUnderConstruction(block, storage, rState));
    }

    @Override // BlockInfo
    // BlockInfoUnderConstruction participates in maps the same way as BlockInfo
    public int hashCode() {
        return super.hashCode();
    }

    @Override // BlockInfo
    public boolean equals(Object obj) {
        // Sufficient to rely on super's implementation
        return (this == obj) || super.equals(obj);
    }

    @Override
    public String toString() {
        final StringBuilder b = new StringBuilder(100);
        appendStringTo(b);
        return b.toString();
    }

    @Override
    public void appendStringTo(StringBuilder sb) {
        super.appendStringTo(sb);
        appendUCParts(sb);
    }

    private void appendUCParts(StringBuilder sb) {
        sb.append("{blockUCState=").append(blockUCState)
                .append(", primaryNodeIndex=").append(primaryNodeIndex)
                .append(", replicas=[");
        if (replicas != null) {
            Iterator<ReplicaUnderConstruction> iter = replicas.iterator();
            if (iter.hasNext()) {
                iter.next().appendStringTo(sb);
                while (iter.hasNext()) {
                    sb.append(", ");
                    iter.next().appendStringTo(sb);
                }
            }
        }
        sb.append("]}");
    }
}
